/*
 * Copyright 2020-2021 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.canal.config.endpoint;

import org.springframework.canal.listener.BatchMessageListener;
import org.springframework.canal.listener.EachMessageListener;
import org.springframework.canal.listener.MessageListener;
import org.springframework.canal.listener.adapter.BatchToEachAdapter;
import org.springframework.canal.listener.adapter.FilteringBatchMessageListenerAdapter;
import org.springframework.canal.listener.adapter.FilteringEachMessageListenerAdapter;
import org.springframework.canal.listener.adapter.MessageFilterStrategy;
import org.springframework.canal.support.converter.record.CanalMessageConverter;
import org.springframework.canal.support.converter.record.EachCanalMessageConverter;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

import java.lang.reflect.Method;
import java.util.Collections;

/**
 * {@link CanalListenerEndpoint} POJO providing the method to invoke to process
 * an incoming message for this endpoint.
 *
 * @param <T> the payload type.
 * @author 橙子
 * @since 2020/11/14
 */
public class CanalListenerMethodEndpoint<T> extends AbstractCanalListenerEndpoint {
    private Object bean;
    private Method listenerMethod;

    public CanalListenerMethodEndpoint(Method listenerMethod, Object bean) {
        this.listenerMethod = listenerMethod;
        this.bean = bean;
    }

    public Object getBean() {
        return this.bean;
    }

    /**
     * Set the object instance that should manage this endpoint.
     *
     * @param bean the target bean instance.
     */
    public void setBean(Object bean) {
        this.bean = bean;
    }

    public Method getListenerMethod() {
        return this.listenerMethod;
    }

    /**
     * Set the method to consume the message handled by method {@link InvocableHandlerMethod}
     * generated by the {@link MessageHandlerMethodFactory}.
     *
     * @param listenerMethod the target method for the {@link #bean}.
     * @see #setMessageHandlerMethodFactory(MessageHandlerMethodFactory)
     */
    public void setListenerMethod(Method listenerMethod) {
        this.listenerMethod = listenerMethod;
    }

    @Override
    public MessageListener<?> createMessageListener(CanalMessageConverter messageConverter, BatchToEachAdapter<?> batchToEachAdapter, MessageFilterStrategy<?> filterStrategy) {
        if (Boolean.TRUE.equals(getBatchListener())) {
            final BatchMessageListener<T> batchMessageListener = createBatchMessageListener(
                    getMessageHandlerMethodFactory().createInvocableHandlerMethod(this.bean, this.listenerMethod),
                    Collections.emptyList(), getErrorHandler(), messageConverter, (BatchToEachAdapter<T>) batchToEachAdapter);
            if (filterStrategy == null)
                return batchMessageListener;
            return new FilteringBatchMessageListenerAdapter<>(batchMessageListener, (MessageFilterStrategy<T>) filterStrategy);
        }
        final EachMessageListener<T> eachMessageListener = createEachMessageListener(getMessageHandlerMethodFactory().createInvocableHandlerMethod(this.bean, this.listenerMethod),
                Collections.emptyList(), getErrorHandler(), messageConverter instanceof EachCanalMessageConverter ? (EachCanalMessageConverter<T>) messageConverter : null);
        if (filterStrategy == null)
            return eachMessageListener;
        return new FilteringEachMessageListenerAdapter<>(eachMessageListener, (MessageFilterStrategy<T>) filterStrategy);
    }

}
